Skip to content

[CELEBORN-2270] Fix problem with eviction to tiered storage during partition split#3610

Open
eolivelli wants to merge 4 commits intoapache:mainfrom
eolivelli:CELEBORN-2270-fix-partition-split
Open

[CELEBORN-2270] Fix problem with eviction to tiered storage during partition split#3610
eolivelli wants to merge 4 commits intoapache:mainfrom
eolivelli:CELEBORN-2270-fix-partition-split

Conversation

@eolivelli
Copy link
Contributor

@eolivelli eolivelli commented Feb 25, 2026

NOTE: this PR is stacked on top of #3608

Please consider only 756d25e

What changes were proposed in this pull request?

Handle the eviction to a different location type.

Why are the changes needed?

Because it may happen that a MEMORY file is to be evicted to another storage type (i.e. S3). This does not work.

Usually, as described in tests in #3608 when you have tiered storage, the primary partition type is usually not MEMORY, but it may happen that during a partition split the client decides to use MEMORY.

This patch fixes the problem on the worker side.
An alternative fix would be to change the behavior of the client, and simulate what the master does when offering slots.
Such a change would be more involved and it won't make the server side resilient to this scenario.

Does this PR resolve a correctness bug?

No.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • New integration tests
  • Manual testing on real k8s cluster with S3

@SteNicholas
Copy link
Member

@eolivelli, #3608 has already been merged. Please rebase the latest main branch.

@eolivelli
Copy link
Contributor Author

@eolivelli, #3608 has already been merged. Please rebase the latest main branch.

rebased


val maxSize = order.get.length
for (i <- tryCreateFileTypeIndex until maxSize) {
val firstIndex = tryCreateFileTypeIndex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this? The change is same as the current implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the value of "firstIndex" is logged below

if (file != null) {
return file
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert this unnecessary change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

}

def tryCreateFileByType(storageInfoType: StorageInfo.Type): TierWriterBase = {
val overrideType = if (evict) storageInfoType else location.getStorageInfo.getType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a scenario where eviction does not occur, should overrideType also be storageInfoType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it this way in order to not change the behavior of the code on the evict=false

this is code is really tricky, because it handles two things at once:

  • the wanted "storageInfoType" from the policy
  • the available types for the location (see the 'switch' on location.getStorageInfo....))

in theory the value for location.getStorageInfo.getType should be the same as storageInfoType when evict=false, but not for MEMORY

code is here:

val tryCreateFileTypeIndex =
      if (evict) {
        0
      } else {
        // keep the old behavior, always try to use memory if worker
        // has configured to use memory storage, because slots allocator
        // will not allocate slots on memory storage
        if (order.exists(_.contains(StorageInfo.Type.MEMORY.name()))) {
          order.get.indexOf(StorageInfo.Type.MEMORY.name())
        } else {
          order.get.indexOf(
            partitionDataWriterContext.getPartitionLocation.getStorageInfo.getType.name())
        }
      }

I am not still super familiar with this code (even if I debugged it many many times 🙄 ) and I did not want to break it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose location.getStorageInfo.getType is set to HDD and createFileOrder is [HDD, S3], if the local HDD fails to create a file, storageInfoType will switch to S3. See following code segment:

for (i <- tryCreateFileTypeIndex until maxSize) {
  val storageStr = order.get(i)
  val storageInfoType = StorageInfo.fromStrToType(storageStr)
  val file = tryCreateFileByType(storageInfoType)
  if (file != null) {
    return file
  }
}

However, when storageInfoType is set to S3, the method createDiskFile still uses location.getStorageInfo.getType as HDD, which seems incorrect. Therefore, StorageType seems should always be overrideType.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants